package com.bizmda.bizsip.sink.api;

import cn.hutool.core.convert.Convert;
import cn.hutool.extra.spring.SpringUtil;
import com.bizmda.bizsip.common.BizCircuitBreaker;
import com.bizmda.bizsip.config.AbstractSinkConfig;
import com.open.capacity.redis.util.RedisUtil;
import lombok.extern.slf4j.Slf4j;

/**
 * @Author: 史正烨
 * @Date: 2022/8/21 10:11
 * @Description:
 */
@Slf4j
public class SinkCircuitBreaker {
    public static final ThreadLocal<SinkCircuitBreaker> sinkCircuitBreakerThreadLocal = new ThreadLocal<>();
    private final RedisUtil redisUtil;
    private final String sinkId;
    /**
     * 是否开启熔断机制，默认为关闭。
     */
    private final boolean enabled;
    /**
     * 设置统计滚动窗口的长度，以秒为单位。用于监控和熔断器 默认10s
     */
    private final long timeInSeconds;
    /**
     * 当在配置时间窗口内达到此数量后的失败，进行短路。默认20个
     */
    private final int requestVolumeThreshold;
    /**
     * 出错百分比阈值，当达到此阈值后，开始短路。默认50%
     */
    private final int errorThresholdPercentage;
    /**
     * 熔断窗口时间，默认为5s。
     */
    private final long sleepWindowInSeconds;

    /**
     * 构建方法
     * @param sinkConfig Sink服务配置
     */
    public SinkCircuitBreaker(AbstractSinkConfig sinkConfig) {
        redisUtil = SpringUtil.getBean(RedisUtil.class);
        this.sinkId = sinkConfig.getId();
        this.enabled = Convert.toBool(sinkConfig.getCircuitBreakerMap().get("enabled"), false);
        this.timeInSeconds = Convert.toLong(sinkConfig.getCircuitBreakerMap().get("time-in-seconds"), 10L);
        this.requestVolumeThreshold = Convert.toInt(sinkConfig.getCircuitBreakerMap().get("request-volume-threshold"), 20);
        this.errorThresholdPercentage = Convert.toInt(sinkConfig.getCircuitBreakerMap().get("error-threshold-percentage"), 50);
        this.sleepWindowInSeconds = Convert.toLong(sinkConfig.getCircuitBreakerMap().get("sleep-window-in-seconds"), 5L);
        String successKey = BizCircuitBreaker.PRE_REDIS_SUCCESS_KEY + sinkId;
        String halfopenKey = BizCircuitBreaker.PRE_REDIS_HALFOPEN_KEY + sinkId;
        if (this.enabled) {
            log.info("Sink[{}]断路器开启:time-in-seconds-{}秒,request-volume-threshold-{}个,error-threshold-percentage-{}%,sleep-window-in-seconds-{}秒",
                    this.sinkId, this.timeInSeconds, this.requestVolumeThreshold, this.errorThresholdPercentage, this.sleepWindowInSeconds);
            this.redisUtil.hmSet(BizCircuitBreaker.REDIS_SLEEP_WINDOW_IN_SECONDS_KEY,
                    sinkId, this.sleepWindowInSeconds);
            long seconds = redisUtil.hasKey(successKey) ?
                    redisUtil.getExpire(successKey) : 0L;
            if (seconds == 0L || seconds > this.timeInSeconds) {
                this.redisUtil.del(successKey);
            }
            seconds = redisUtil.hasKey(halfopenKey) ?
                    redisUtil.getExpire(halfopenKey) : 0L;
            if (seconds == 0L || seconds > this.sleepWindowInSeconds) {
                this.redisUtil.del(halfopenKey);
            }
        }
        else {
            this.redisUtil.hmDel(BizCircuitBreaker.REDIS_SLEEP_WINDOW_IN_SECONDS_KEY,
                    sinkId);
            this.redisUtil.del(successKey);
            this.redisUtil.del(halfopenKey);
        }
    }

    /**
     * 关联熔断器指标统计的Sink服务出错
     */
    public static void error() {
        SinkCircuitBreaker sinkCircuitBreaker = sinkCircuitBreakerThreadLocal.get();
        if (!sinkCircuitBreaker.enabled) {
            return;
        }
        sinkCircuitBreaker.setFlag(false);
    }

    /**
     * 关联熔断器指标统计的Sink服务成功
     */
    public static void success() {
        SinkCircuitBreaker sinkCircuitBreaker = sinkCircuitBreakerThreadLocal.get();
        if (!sinkCircuitBreaker.enabled) {
            return;
        }
        sinkCircuitBreaker.setFlag(true);
    }

    private void setFlag(boolean flag) {
        log.debug("Sink[{}]断路器交易统计:{}", this.sinkId, flag ? "成功交易+1" : "失败交易+1");
        String successKey = BizCircuitBreaker.PRE_REDIS_SUCCESS_KEY + this.sinkId;
        String errorKey = BizCircuitBreaker.PRE_REDIS_ERROR_KEY + this.sinkId;
        String halfopenKey = BizCircuitBreaker.PRE_REDIS_HALFOPEN_KEY + this.sinkId;

        long successCount;
        long errorCount;
        if (!this.redisUtil.hasKey(successKey)) {
            this.redisUtil.set(successKey, 0L, this.timeInSeconds);
            this.redisUtil.set(errorKey, 0L);
        }
        if (flag) {
            successCount = this.redisUtil.incr(successKey, 1L);
            errorCount = Convert.toLong(this.redisUtil.get(errorKey),0L);
        } else {
            errorCount = this.redisUtil.incr(errorKey, 1L);
            successCount = Convert.toLong(this.redisUtil.get(successKey),0L);
        }

        Integer status = (Integer) this.redisUtil.hmGet(BizCircuitBreaker.REDIS_STATUS_KEY, sinkId);
        if (status == null) {
            this.redisUtil.hmSet(BizCircuitBreaker.REDIS_STATUS_KEY,sinkId,BizCircuitBreaker.CIRUIT_BREAKER_CLOSE);
            status = BizCircuitBreaker.CIRUIT_BREAKER_CLOSE;
        }
        if (status == null || status == BizCircuitBreaker.CIRUIT_BREAKER_CLOSE) {
            long totalCount = successCount + errorCount;
            long percent = (totalCount == 0) ? 0 : errorCount * 100 / totalCount;
            log.debug("Sink[{}]断路器为闭路状态:成功交易数-{},失败交易数-{},失败率-{}%",
                    sinkId, successCount, errorCount, percent);
            if (errorCount >= this.requestVolumeThreshold
                    && percent >= this.errorThresholdPercentage) {
                log.warn("Sink({})触发熔断，断路器为开路状态: 成功交易数-{},失败交易数-{},失败率-{}%",
                        sinkId, successCount, errorCount, percent);
                this.redisUtil.hmSet(BizCircuitBreaker.REDIS_STATUS_KEY, sinkId, BizCircuitBreaker.CIRUIT_BREAKER_OPEN);
                this.redisUtil.set(halfopenKey, 1, this.sleepWindowInSeconds);
            }
        } else if (status == BizCircuitBreaker.CIRUIT_BREAKER_HALF_OPEN
                || status == BizCircuitBreaker.CIRUIT_BREAKER_OPEN) {
            log.debug("Sink[{}]断路器为{}状态:成功交易数-{},失败交易数-{}",
                    sinkId,
                    status == BizCircuitBreaker.CIRUIT_BREAKER_OPEN?"开路":"半开路",
                    successCount, errorCount);
            if (flag) {
                log.info("Sink({})请求成功，断路器切换成闭路状态", sinkId);
                this.redisUtil.hmSet(BizCircuitBreaker.REDIS_STATUS_KEY,
                        sinkId, BizCircuitBreaker.CIRUIT_BREAKER_CLOSE);
                this.redisUtil.set(successKey, 0, this.timeInSeconds);
                this.redisUtil.set(errorKey, 0);
            } else {
                log.warn("Sink({})请求失败，断路器继续保持为开路状态[{}]秒", sinkId,this.sleepWindowInSeconds);
                this.redisUtil.hmSet(BizCircuitBreaker.REDIS_STATUS_KEY,
                        sinkId, BizCircuitBreaker.CIRUIT_BREAKER_OPEN);
                this.redisUtil.set(halfopenKey, 1, this.sleepWindowInSeconds);
            }
        }
    }
}
